Skip to content

fix(scheduler): AsyncScheduler.flush() kills sibling actions and tears down subscriber chains on error#7599

Open
stewartmcgown wants to merge 2 commits intoReactiveX:7.xfrom
stewartmcgown:fix/async-scheduler-flush-upstream
Open

fix(scheduler): AsyncScheduler.flush() kills sibling actions and tears down subscriber chains on error#7599
stewartmcgown wants to merge 2 commits intoReactiveX:7.xfrom
stewartmcgown:fix/async-scheduler-flush-upstream

Conversation

@stewartmcgown
Copy link
Copy Markdown

Description

AsyncScheduler.flush() has two bugs in its error handling that were never fixed, even though the same class of bugs were fixed in AsapScheduler and AnimationFrameScheduler via #6674 and #7444.

Bug 1: Sibling actions are killed on error

When any action errors during a flush, the error handler unsubscribes ALL remaining queued actions:

// AsyncScheduler.flush() — before this PR
if (error) {
  while ((action = actions.shift()!)) {
    action.unsubscribe();  // kills every remaining action
  }
  throw error;
}

The erroring action already unsubscribes itself in AsyncAction._execute(). The remaining actions are independent operations that should not be affected.

Bug 2: Synchronous throw tears down subscriber chains

After killing siblings, flush() throws synchronously. For QueueScheduler (delay=0), this throw propagates back through the synchronous call stack: QueueAction.schedule()executeSchedule()OperatorSubscriber._next catch → destination.error() — tearing down the entire subscriber pipeline.

This is the root cause of NgRx "store death": observeOn(queueScheduler) queues independent subscriber notifications in the same flush cycle. When one errors, the throw propagates through the subscriber chain and unsubscribes the entire observeOn → withLatestFrom → scan → State pipeline.

The Fix

  1. Remove the blanket while (actions.shift()) { action.unsubscribe() } loop
  2. Replace throw error with reportUnhandledError(error) and continue flushing
 do {
   if ((error = action.execute(action.state, action.delay))) {
-    break;
+    reportUnhandledError(error);
+    error = null;
   }
 } while ((action = actions.shift()!));
 
 this._active = false;
-
-if (error) {
-  while ((action = actions.shift()!)) {
-    action.unsubscribe();
-  }
-  throw error;
-}

Errors surface asynchronously via config.onUnhandledError or setTimeout — consistent with how ConsumerObserver.next already handles subscriber errors in RxJS. The erroring action still self-cleans via _execute(). Remaining actions execute in the same flush.

Background

Test plan

  • Errors are reported via config.onUnhandledError, not thrown synchronously
  • Sibling actions are not unsubscribed when one errors
  • Sibling actions execute in the same flush (no deferred execution)
  • NgRx pattern: observeOn(queueScheduler) with two subscribers — one throws, both subscriptions survive, subsequent emissions still work
  • Scheduler is reusable after an error
  • All 123 scheduler tests pass
  • Full suite: 3243/3245 pass (2 pre-existing: ajax JSON format + WebSocket constructor on modern Node.js)

cc @benlesh @cartant @pmoleri @trxcllnt

Made with Cursor

…heduler flush

PR ReactiveX#6674 fixed AnimationFrameScheduler and AsapScheduler to scope flush
cycles using a flushId, so errors only kill actions belonging to the same
flush. AsyncScheduler (and QueueScheduler which extends it) was never
given the same fix. Its error handler blindly unsubscribed ALL remaining
queued actions when any single action threw.

This is the root cause of NgRx "store death": observeOn(queueScheduler)
queues independent subscriber notifications in the same flush cycle. When
one subscriber's handler throws, the error path destroyed every other
subscriber's pending action — permanently killing the store.

The erroring action already unsubscribes itself inside
AsyncAction._execute(). The remaining actions are independent operations
that should survive. Remove the blanket unsubscribe loop in the error
path.

Relates to ReactiveX#6672, ReactiveX#4690, ReactiveX#2697

Made-with: Cursor
…owing synchronously

When AsyncScheduler.flush() throws synchronously, the error propagates
back through the synchronous call stack — through QueueAction.schedule(),
executeSchedule(), and into the OperatorSubscriber that initiated the
flush. This tears down the entire subscriber chain, permanently killing
pipelines like NgRx's observeOn(queueScheduler) → scan → State.

Change flush() to use reportUnhandledError() instead of throw:
- Errors surface asynchronously via config.onUnhandledError or
  setTimeout (consistent with how RxJS handles unhandled subscriber errors)
- The synchronous subscriber chain is not torn down
- Remaining queued actions continue executing in the same flush
  (they are independent operations unaffected by a sibling's error)

The erroring action still unsubscribes itself immediately in _execute().

This is analogous to how ConsumerObserver.next already handles subscriber
errors — catch and report asynchronously rather than propagating
synchronously through the call stack.

Made-with: Cursor
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants